iT邦幫忙

2024 iThome 鐵人賽

DAY 20
1
生成式 AI

2024 年用 LangGraph 從零開始實現 Agentic AI System系列 第 20

【Day 20】- 結合 LangGraph 與 MongoDB 打造智慧工地安全監控系統:Agentic RAG 技術應用實例

  • 分享至 

  • xImage
  •  

摘要
這篇文章探討了如何將 LangGraph 的強大功能與 MongoDB 的資料儲存和檢索能力相結合,打造一個智慧的工地安全監控系統。文章首先介紹了 LangGraph 的 Checkpointer 機制,以及使用記憶體儲存方式的局限性。接著,文章重點介紹了如何使用 MongoDB 作為 Checkpointer 來解決這些問題,並展示了如何在 MongoDB 中儲存和管理工地安全事件資料。

文章的核心概念是 Agentic RAG,一種結合大語言模型和資訊檢索技術的人工智慧開發範式。透過使用 OpenAI 的嵌入模型將文字資料轉換為向量表示,並利用 MongoDB 的 $vectorSearch 運算子進行向量搜尋,系統可以理解使用者自然語言查詢,快速找到語意相關的安全事件記錄,並提供詳細的資訊。

文章最後展示了如何將 LangGraph 的工作流與 MongoDB 整合,讓系統能夠即時分析安全事件資料,持久化儲存對話歷史和系統狀態,並利用向量搜尋快速檢索相關資訊。透過這個整合,文章打造了一個智慧而高效的工地安全監控系統,並強調這種技術組合可以應用於許多不同的領域。

前言:延續 LangGraph 的探索

在上一篇文章中,我們深入探討了 LangGraph 中的 Checkpointer 機制,特別關注了如何使用 MemorySaver 來實現基本的對話歷史管理。這種方法為我們提供了一個簡單而有效的方式來在應用程式運行期間保存和檢索狀態資訊。

img

然而,隨著應用規模的擴大和需要處理的資料量的增加,記憶體存儲方式可能會面臨一些挑戰,比如:

  1. 資料持久性:當應用重啟時,記憶體中的資料會遺失。
  2. 儲存容量:記憶體儲存的容量有限,不適合長期儲存大量資料。
  3. 資料共享:多個實例或服務之間難以共享記憶體中的資料。

為了解決這些問題,我們需要探索更強大、更靈活的儲存解決方案。這就是為什麼在本篇文章中,我們將把注意力轉向外部資料庫,特別是 MongoDB。在接下來的內容中,我們將透過一個實際的案例 —— 工地安全監控系統 —— 來演示如何在 LangGraph 應用中整合和使用 MongoDB。

系統概述

想像一個場景:在一個繁忙的建築工地上,數百名工人、數十台重型機械同時運作。如何確保每個人的安全?如何及時發現並預防潛在的危險?這就是我們的智慧監控系統要解決的核心問題。
本系統的主要功能包括:
1.資料儲存與管理:利用 MongoDB 高效儲存和管理大量複雜的安全事件資料。
2.智慧分析:運用 LangGraph 框架進行自然語言處理和智慧決策。
3.互動式查詢:允許安全管理人員透過自然語言對話方式查詢和分析安全資訊。

Agentic RAG:智慧系統的新範式

Agentic RAG 是人工智慧應用開發的一個新興範式,它結合了大語言模型(LLM)的強大能力和資訊檢索技術。在我們的工地安全場景中,這意味著系統可以:

  • 理解並分解複雜的安全問題
  • 利用檢索器快速訪問相關的違規紀錄
  • 運用推理能力分析潛在風險
  • 提出具體的安全改進建議

技術深度:Agentic RAG 的核心

要理解 Agentic RAG,我們需要了解幾個關鍵概念:

  • AI 代理:這是一個能感知環境、進行決策並採取行動的計算實體。在我們的系統中,它可以是負責監控整個工地安全狀況的中央智慧單元。
  • 檢索器(Retriever):這是 RAG 系統的核心組件,負責從知識庫中檢索相關資訊。在工地安全系統中,它可以快速定位與特定安全問題相關的歷史資料、規章制度或最佳實踐。
  • 工具使用:先進的 LLM 模型具有使用各種工具的能力。在我們的系統中,這可能包括呼叫特定的資料分析函數或與其他系統互動。
  • 語意搜尋:這允許系統理解查詢的上下文和意圖,而不僅僅是關鍵字匹配。例如,理解「最近是否有人沒穿安全背心」這樣的複雜查詢。

1. 建構工地安全監控系統

假設情境如下:

位於台北市的「新北科技園區」正在進行一項大型建設專案,預計在三年內完成一座現代化的科技園區。為了確保工程進行期間的安全,專案管理團隊引入了一套先進的安全監控系統。 這套系統整合了多種技術,包括:

分佈在工地各處的高清攝影機 智能識別軟體 即時通報機制

系統的主要目標是預防事故發生,並在發生異常情況時能夠迅速反應。

2. 建構工地安全監控系統資料庫

2.1 創建模擬資料集

為了模擬真實的工地安全監控場景,我們創建了一個包含多個安全事件的資料集。這些事件涵蓋了各種安全問題,如設備故障、人員違規和環境危害等。

##數據欄位說明

  1. 事件ID: 唯一識別每個安全事件的編號,格式為 "INCxxxx"。
  2. 日期時間: 事件發生的具體日期和時間,格式為 "YYYY-MM-DD HH:MM:SS"。
  3. 事件類型: 描述發生的安全事件類型,例如:跌倒、設備故障、物體掉落等。
  4. 嚴重程度: 表示事件的嚴重程度,分為:輕微、中等、嚴重、危急。
  5. 地點: 事件發生的具體位置,包括區域和子區域,例如:"主建築區-3區"。
  6. 涉及人員: 事件中主要涉及的人員姓名。
  7. 人員角色: 涉及人員在工地的職務角色,如:施工人員、工程師、安全主管等。
  8. 攝影機ID: 記錄事件的攝影機編號,格式為 "CAMxxx"。
  9. 攝影機類型: 描述使用的攝影機類型,如:固定式攝影機、球型攝影機、熱感應攝影機。
  10. 異常事件類型: 如果適用,描述與事件相關的具體異常行為,如:未佩戴安全裝備、進入危險區域等。
  11. 處理人員: 負責處理或報告該事件的人員姓名。
  12. 影片來源: 記錄事件的視頻文件存儲位置,格式為 "/video_storage/YYYY/MM/DD/CAMxxx_YYYYMMDD_HHMMSS.mp4"。

範例如下

事件ID: INC0001
日期時間: 2024-02-24 15:32:35
事件類型: 火災
嚴重程度: 危急
地點: 機械操作區-3區
涉及人員: 曹彥廷
人員角色: 工程師
攝影機ID: CAM009
攝影機類型: 熱感應攝影機
異常事件類型: 違規操作
處理人員: 曹彥廷
影片來源: /video_storage/2024/02/24/CAM009_20240224_153235.mp4

解讀
2024年2月24日下午3點32分,在機械操作區-3區發生了一起嚴重的火災事件。涉及人員是工程師曹彥廷,可能由於違規操作引發。事件被 CAM009 號熱感應攝影機記錄,處理人員同樣是曹彥廷。相關的視頻證據存儲在指定的位置。

聲明:模擬工地現場存在 Schema,全程使用 Faker 套件製作,如果人名、地點、時間雷同,純屬巧合。

2.2 欄位示範

欄位截圖img

3. MongoDB 向量資料庫上傳與連接設定教學

在我們的 RAG 系統中,MongoDB 扮演著雙重角色:既是傳統的操作型資料庫,又是高效的向量資料庫。特別是 MongoDB Atlas,它為我們提供了一個強大的解決方案,能夠高效地儲存、查詢和檢索向量嵌入。

讓我們一步步來設定我們的 MongoDB Atlas 環境:

3.1 MongoDB Atlas 帳戶設定

  1. 首先,訪問 MongoDB Atlas 官網 註冊一個帳戶。如果您已有帳戶,直接登入即可。
  2. 按照官方指南,使用 Atlas UI 介面部署您的第一個叢集。選擇一個離您較近的伺服器位置,比如東亞地區,以確保最佳的連線速度。

成功建置 DB 後畫面
Caption: 成功建置 DB 逅畫面

3.2 創建資料庫和集合

在 Atlas 中,我們需要創建用於儲存安全事件資料的資料庫和集合:

  1. 創建一個名為 construction_safety_ai_use_case 的資料庫。
  2. 在該資料庫中,建立一個名為 safety_reports 的集合。

3.3 創建向量搜尋索引

為了支援高效的向量搜尋,我們需要為 safety_reports 集合創建一個特殊的索引:

  1. safety_reports 集合創建一個向量搜尋索引,命名為 safety_reports_vector_index
  2. 使用以下 JSON 定義創建索引:
{
  "fields": [
    {
      "numDimensions": 256,
      "path": "embedding",
      "similarity": "cosine",
      "type": "vector"
    }
  ]
}

這個索引將使我們的 RAG 應用能夠透過向量搜尋快速檢索相關記錄,為使用者查詢提供額外的上下文資訊。

3.4 建立資料庫連線

取得 MongoDB Atlas 叢集的連線 URI 後,我們可以使用 PyMongo 函式庫輕鬆建立連線:

import os
from pymongo import MongoClient

MONGO_URI = os.environ.get("MONGO_URI")
client = MongoClient(MONGO_URI)
db = client.get_database("construction_safety_ai_use_case")
collection = db.get_collection("safety_reports")

注意事項:

  1. 確保在設定資料庫存取權限時,仔細設定網路安全設定,只允許受信任的 IP 位址連線。
  2. 在實際應用中,將連線字串等敏感資訊儲存在環境變數或安全的設定檔中,避免直接寫入程式碼。

3.5 上傳資料到 MongoDB

移除任何集合,通常需要安置這個,可能會建立許多次

# Delete any existing records in the collection
collection.delete_many({})

將合併後 dataframe 上傳

documents = [row.to_dict() for _, row in df.iterrows()]
collection.insert_many(documents)
print("Data ingestion into MongoDB completed")

成功後你會在 MongoDB 獲得類似以下畫面
已有上傳資料在 MongoDB 截圖
Caption: 已有上傳資料在 MongoDB 截圖

透過以上步驟,我們就成功設定了 MongoDB 向量資料庫,為後續的 RAG 系統開發奠定了堅實的基礎。這個設定不僅支援傳統的資料操作,還能高效處理向量搜尋,這對於我們的工地安全監控系統至關重要。

如果在操作過程中遇到任何問題,可以參考 MongoDB 的官方文件或尋求技術支援。接下來,我們將探討如何利用這個強大的資料庫設定來實現我們的智慧安全監控系統。

4. 向量檢索功能實現

在我們的工地安全監控系統中,向量檢索功能是實現智慧查詢和分析的關鍵。這個功能允許我們基於語意相似性快速找到相關的安全事件記錄,為使用者提供精確而相關的資訊。讓我們逐步實現這個功能:

4.1 資料預處理和向量化

首先,我們需要將文字資料轉換為向量表示。這裡我們使用 OpenAI 的嵌入模型:首先,我们需要将文本数据转换为向量表示。这里我们使用 OpenAI 的嵌入模型:

from langchain_openai import OpenAIEmbeddings

OPEN_AI_EMBEDDING_MODEL = "text-embedding-3-small"
OPEN_AI_EMBEDDING_MODEL_DIMENSION = 256

embedding_model = OpenAIEmbeddings(model=OPEN_AI_EMBEDDING_MODEL, dimensions=OPEN_AI_EMBEDDING_MODEL_DIMENSION)

def get_embedding(text):
    return embedding_model.embed_query(text=text)

4.2 實現向量搜尋函數

接下來,我們實現一個函數來執行向量搜尋:

def vector_search(user_query, collection):
    query_embedding = get_embedding(user_query)
    
    pipeline = [
        {
            "$vectorSearch": {
                "index": "safety_reports_vector_index",
                "queryVector": query_embedding,
                "path": "embedding",
                "numCandidates": 100,
                "limit": 5
            }
        },
        {
            "$unset": "embedding"
        },
        {
            "$project": {
                "_id": 0,
                "事件ID": 1,
                "日期時間": 1,
                "違規事件類型": 1,
                "地點": 1,
                "違規人員": 1,
                "違反職安條款": 1,
                "score": {"$meta": "vectorSearchScore"}
            }
        }
    ]
    
    return list(collection.aggregate(pipeline))

這個函數執行以下操作:

  1. 將使用者查詢轉換為向量。
  2. 使用 MongoDB 的 $vectorSearch 運算子執行相似性搜尋。
  3. 移除結果中的嵌入向量以減少資料傳輸。
  4. 投影出我們需要的欄位,包括向量搜尋的相似度分數。

4.3 結果格式化函數

為了更好地展示搜尋結果,我們可以實現一個格式化函數:

def format_search_results(results):
    formatted_results = []
    for result in results:
        formatted_result = (
            f"事件ID: {result['事件ID']}\n"
            f"日期時間: {result['日期時間']}\n"
            f"違規事件類型: {result['違規事件類型']}\n"
            f"地點: {result['地點']}\n"
            f"違規人員: {result['違規人員']}\n"
            f"違反職安條款: {result['違反職安條款']}\n"
            f"相似度分數: {result['score']:.4f}\n"
        )
        formatted_results.append(formatted_result)
    return "\n".join(formatted_results)

4.4 測試向量檢索功能

最後,我們可以測試我們的向量檢索功能:

def test_vector_search(query):
    results = vector_search(query, collection)
    print(f"查詢: {query}\n")
    print(format_search_results(results))

# 測試查詢
test_queries = [
    "辦公區最近有什麼異常情況嗎?",
    "停車場有沒有什麼安全問題?",
    "有人在工作時沒戴安全裝備嗎?",
    "材料堆放區有什麼需要注意的嗎?",
]

for query in test_queries:
    test_vector_search(query)
    print("\n" + "="*50 + "\n")

這個測試函數允許我們輸入不同的查詢,並查看系統如何檢索相關的安全事件記錄。

截圖

透過實現這個向量檢索功能,我們的工地安全監控系統現在能夠:

  1. 理解使用者的自然語言查詢。
  2. 快速找到語意相關的安全事件記錄。
  3. 提供詳細且相關的資訊,幫助安全管理人員更有效地分析和預防潛在風險。

這個功能為我們的系統增加了強大的語意理解能力,使其不僅能夠基於關鍵字匹配來查找資訊,還能理解查詢的上下文和意圖,從而提供更加精準和有價值的結果。在下一節中,我們將探討如何將這個功能整合到我們的 LangGraph 工作流中,進一步增強系統的智慧性。

目前為止完成基礎的工作,整理資料及、建立向量資訊、存放資料庫、驗證檢索是否成功,接下來才要跟語言模型有關,在這之前都是 AI 沒啥關聯的XD

5. LangGraph 與 MongoDB 整合

在這一節中,我們將探討如何將 LangGraph 的強大功能與 MongoDB 的資料儲存和檢索能力相結合,創建一個智慧的工地安全監控系統。

5.1 MongoDB Checkpointer 實現

LangGraph 的一個關鍵特性是能夠保存和恢復狀態。為了利用 MongoDB 的持久化儲存能力,我們實現了一個自定義的 MongoDBSaver 類別:

import pickle
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Any, Dict, Optional, AsyncIterator, Union, List, Tuple

from langchain_core.runnables import RunnableConfig
from typing_extensions import Self

from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
    SerializerProtocol,
)
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone

class JsonPlusSerializerCompat(JsonPlusSerializer):
    def loads(self, data: bytes) -> Any:
        if data.startswith(b"\x80") and data.endswith(b"."):
            return pickle.loads(data)
        return super().loads(data)

class MongoDBSaver(AbstractContextManager, BaseCheckpointSaver):
    serde = JsonPlusSerializerCompat()

    client: AsyncIOMotorClient
    db_name: str
    collection_name: str

    def __init__(
        self,
        client: AsyncIOMotorClient,
        db_name: str,
        collection_name: str,
        *,
        serde: Optional[SerializerProtocol] = None,
    ) -> None:
        super().__init__(serde=serde)
        self.client = client
        self.db_name = db_name
        self.collection_name = collection_name
        self.collection = client[db_name][collection_name]

    # ... [其他方法實現]

    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: Optional[dict[str, Union[str, float, int]]]
    ) -> RunnableConfig:
        doc = {
            "thread_id": config["configurable"]["thread_id"],
            "thread_ts": checkpoint["id"],
            "checkpoint": self.serde.dumps(checkpoint),
            "metadata": self.serde.dumps(metadata),
        }
        if config["configurable"].get("thread_ts"):
            doc["parent_ts"] = config["configurable"]["thread_ts"]
        await self.collection.insert_one(doc)
        return {
            "configurable": {
                "thread_id": config["configurable"]["thread_id"],
                "thread_ts": checkpoint["id"],
            }
        }

這個 MongoDBSaver 類別允許 LangGraph 將狀態資訊儲存在 MongoDB 中,實現了跨會話的持久化和狀態恢復。

補充說明
在官方文件中有分成 sync 以及 async 的實作方式,可以參考這個連結,後來找到網友討論說 MongoDB 官方也有自己實現一版,可以參考這個 Repo,MongoDB 官方部落格也會不定時更新相關文章,強烈建議大家 Follow 起來。

5.2 定義工具和 AI 代理

為了使我們的系統能夠執行各種任務,我們定義了一系列工具和代理:

@tool
def list_safety_incidents(limit: int = 10, skip: int = 0, sort_by: str = "日期時間", sort_order: int = -1) -> str:
    # ... [實現程式碼]

@tool
def search_safety_incident(incident_id: str) -> str:
    # ... [實現程式碼]

@tool
def search_safety_incidents_by_type(event_type: str, limit: int = 5) -> str:
    # ... [實現程式碼]

safety_tools = [
    list_safety_incidents,
    search_safety_incident,
    search_safety_incidents_by_type,
]

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0)

def create_agent(llm, tools, system_message: str):
    # ... [實現程式碼]

safety_management_agent = create_agent(
    llm,
    safety_tools,
    system_message="""
    你是一個先進的安全事件管理分析助理(SEMA),專門負責工作場所的安全事件分析和管理。你的主要責任包括:
    1. 安全事件分析
    2. 支援安全決策
    3. 產生報告和分析
    ...
    """
)

這些工具和代理使我們的系統能夠執行複雜的安全分析任務。

5.3 構建 LangGraph 工作流

最後,我們使用 LangGraph 構建一個工作流,將所有組件整合在一起:

from langgraph.graph import END, StateGraph
from langgraph.prebuilt import tools_condition

workflow = StateGraph(AgentState)

workflow.add_node("chatbot", chatbot_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("chatbot")
workflow.add_conditional_edges(
    "chatbot",
    tools_condition,
    {"tools": "tools", END: END}
)

workflow.add_edge("tools", "chatbot")

mongo_client = AsyncIOMotorClient(MONGO_URI)
mongodb_checkpointer = MongoDBSaver(mongo_client, DB_NAME, "state_store")

graph = workflow.compile(checkpointer=mongodb_checkpointer)

上 LangGRaph 視覺圖

這個工作流定義了系統的行為邏輯,包括如何處理使用者輸入、何時使用工具,以及如何產生回應。

5.4 運行對話循環

為了與使用者互動,我們實現了一個非同步的對話循環:

import asyncio
from langchain_core.messages import HumanMessage, AIMessage
import time

async def chat_loop():
    config = {"configurable": {"thread_id": "10"}}
    print("可以輸入 'quit', 'exit', 'q' 來結束對話")

    while True:
        user_input = await asyncio.get_event_loop().run_in_executor(None, input, "使用者: ")
        if user_input.lower() in ["quit", "exit", "q"]:
            print("再見!")
            break

        sanitized_name = sanitize_name("Human") or "匿名"
        state = {"messages": [HumanMessage(content=user_input, name=sanitized_name)]}

        print("助理: ", end="", flush=True)

        max_retries = 3
        retry_delay = 1

        for attempt in range(max_retries):
            try:
                async for chunk in graph.astream(state, config, stream_mode="values"):
                    # ... [處理回應]
                break
            except Exception as e:
                # ... [錯誤處理]

        print("\n")

await chat_loop()

img

透過這種整合,我們創建了一個強大的工地安全監控系統,它能夠:

  1. 即時分析安全事件資料
  2. 持久化儲存對話歷史和系統狀態
  3. 利用向量搜尋快速檢索相關資訊

總結

在這篇文章中,我們深入探討了如何在 LangGraph 中使用 MongoDB 作為 Checkpointer。我們選擇了工地安全監控系統作為例子,不僅因為它很實用,更是為了讓大家更容易理解這些技術概念。
透過這個例子,我們看到了 MongoDB 作為 Checkpointer 的強大之處。它不僅能夠保存對話的狀態,還能在需要時輕鬆恢復,這對於構建複雜的對話系統來說真的很重要。我們還看到了如何將 LangGraph 和 MongoDB 結合起來,創造出一個既智慧又高效的系統。

雖然我們用的是工地安全這個例子,但其實這種技術組合可以用在很多不同的領域。我們希望透過這篇文章,能給大家一些啟發,看看如何在自己的專案中運用這些技術。無論你是在開發什麼樣的系統,只要涉及到複雜的對話和資料管理,這種方法都可能對你有所幫助。

總的來說,我們希望這篇文章不僅讓你了解了如何使用 MongoDB 作為 Checkpointer,還能激發你在自己的專案中嘗試類似的技術組合。

即刻前往教學程式碼 Repo,親自動手實作工安監控系統吧!別忘了給專案按個星星並持續關注更新,讓我們一起探索AI代理的新境界。

碎碎念時間:這邊實作弄好之後,MongoDB 也剛好弄了一篇工地安全助理的教學程式碼,資料集不同之外,邏輯大致上相同,也可以看看這份檔案

X. 參考資料

  1. 官方文件
  2. MongoDB Blog

上一篇
【Day 19】- LangGraph 的記憶機制:提升 AI 助理的上下文理解能力
下一篇
【Day 21】- 從基礎到進階: 掌握RAG基礎並使用LangGraph實現Agentic RAG
系列文
2024 年用 LangGraph 從零開始實現 Agentic AI System31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
hlb
iT邦新手 5 級 ‧ 2024-11-07 18:12:53

有沒有出什麼大包 -> 最近沒有與包裹相關的安全事件報告 /images/emoticon/emoticon37.gif

我要留言

立即登入留言